package main

// subscribe 订阅 发布 测试
// 主要实现 接收业务服务器的消息推送，接收到的消息过，进行分拣过滤后，推送到在线用户端。这里只写了接收逻辑
import (
	"bytes"
	"fmt"
	"google.golang.org/protobuf/proto"
	"src/connection"
	"src/model/ProtoModel"
	"src/service"
	"time"
)

var idx int64
var starttime int64
var num int64

// 订阅消息 格式封包
func packetMessage() *ProtoModel.MessagesPubProto {
	var mes2 ProtoModel.MessagesPubProto
	mes2.Cid = idx
	mes2.Msg = append(mes2.Msg, &ProtoModel.MessageProto{Action: "XdfdsfdsdfdsX", Image: "1"})
	mes2.Msg = append(mes2.Msg, &ProtoModel.MessageProto{Action: "444444444444X", Image: "2"})
	return &mes2
}

// 订阅消息 格式解包
func unpackMessage(byte2 []byte) (byte, byte, *ProtoModel.MessagesPubProto, error) {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
		}
	}()
	buf := bytes.NewBuffer(byte2)

	tp, _ := buf.ReadByte()
	dttype, _ := buf.ReadByte()

	dt := buf.Bytes()

	var data = ProtoModel.MessagesPubProto{}
	err := proto.Unmarshal(dt, &data)
	if err != nil {

		return 0, 0, nil, err
	}
	return tp, dttype, &data, nil
}

// 接收订阅消息回调函数
func red(cmd byte, act byte, msg *ProtoModel.MessagesPubProto) {
	num++
	if starttime == 0 {
		starttime = time.Now().Unix()
	}
	var id = msg.Cid
	if num%10000 == 0 || id%10000 == 0 {
		fmt.Println("send-id", id, ";read-id:", num, ";time:", starttime, ";耗时:", time.Now().Unix()-starttime)
	}
}

// 打包解包 调试
func subpub() {
	for {
		but := service.Sub{}
		byts := packetMessage()
		but.Message(byts)
		cmd, act, msgs, err := unpackMessage(but.Buffer.Bytes())
		if err != nil {
			fmt.Println(cmd, act, msgs, err)
			return
		}
		red(cmd, act, msgs)
		if idx%101000 == 0 {
			//fmt.Println(cmd, act, msgs, err)
			time.Sleep(100 * time.Second)
		}
		//time.Sleep(1 * time.Second)
	}
}

// 回调函数
func TestCallback1(chann string, msg []byte) {
	cmd, act, msgs, err := unpackMessage(msg)
	if err != nil {
		fmt.Println(cmd, act, msgs, err)
		return
	}
	red(cmd, act, msgs)
}

func sub() {
	defer func() {
		if err := recover(); err != nil {

			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
			//time.Sleep(2 * time.Second)
			sub()
		}
	}()
	connection.RedisPool.PubSubConn("chat", TestCallback1)

}
func pub() {
	defer func() {
		if err := recover(); err != nil {
			idx--
			fmt.Println(err) // 这里的err其实就是panic传入的内容，55
			//time.Sleep(2 * time.Second)
			pub()
		}
	}()
	for {
		idx++
		sub1 := service.Sub{Sid: "xxfd", Uid: "1084", Cid: idx}
		sub1.Message(packetMessage())

		err := sub1.SendMessage()
		if err != nil {
			return
		}
		//用send 比do快5倍左右，实测到10秒发送92.5w，平均每秒9.2万,do方法推送10秒16w条左右。
		//time.Sleep(time.Second)
		if idx%101000 == 0 {
			time.Sleep(100 * time.Second)
		}
	}

}

func main() {
	connection.InIt() //初始化连接redis
	connection.NewRedisClient()
	var start = false
	var starttime int64
	for start != true {
		if starttime != 0 && starttime < time.Now().Unix() {
			start = true
		}
		starttime = time.Now().Unix()
	}
	fmt.Println("starttime", starttime)
	go sub()
	pub()

}
